All files / remote datastore.ts

100% Statements 43/43
100% Branches 0/0
100% Functions 16/16
100% Lines 38/38
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134                                    2x       2x       2x                                         2x   603x 603x 603x 603x     2x 633x               2x 630x               132x 132x   98x   132x       96x       84x 84x   84x   84x       84x 84x 84x 84x   84x 84x 84x 84x 84x   84x         132x   132x 132x         84x         84x 84x             2x  
/**
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
import * as api from '../protos/firestore_proto_api';
import { CredentialsProvider } from '../api/credentials';
import { maybeDocumentMap } from '../model/collections';
import { MaybeDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation, MutationResult } from '../model/mutation';
import { assert } from '../util/assert';
import { AsyncQueue } from '../util/async_queue';
 
import { Connection } from './connection';
import {
  PersistentListenStream,
  PersistentWriteStream
} from './persistent_stream';
import { JsonProtoSerializer } from './serializer';
 
// The generated proto interfaces for these class are missing the database
// field. So we add it here.
// TODO(b/36015800): Remove this once the api generator is fixed.
interface BatchGetDocumentsRequest extends api.BatchGetDocumentsRequest {
  database?: string;
}
interface CommitRequest extends api.CommitRequest {
  database?: string;
}
 
/**
 * Datastore is a wrapper around the external Google Cloud Datastore grpc API,
 * which provides an interface that is more convenient for the rest of the
 * client SDK architecture to consume.
 */
export class Datastore {
  constructor(
    private queue: AsyncQueue,
    private connection: Connection,
    private credentials: CredentialsProvider,
    private serializer: JsonProtoSerializer
  ) {}
 
  newPersistentWriteStream(): PersistentWriteStream {
    return new PersistentWriteStream(
      this.queue,
      this.connection,
      this.credentials,
      this.serializer
    );
  }
 
  newPersistentWatchStream(): PersistentListenStream {
    return new PersistentListenStream(
      this.queue,
      this.connection,
      this.credentials,
      this.serializer
    );
  }
 
  commit(mutations: Mutation[]): Promise<MutationResult[]> {
    const params: CommitRequest = {
      database: this.serializer.encodedDatabaseId,
      writes: mutations.map(m => this.serializer.toMutation(m))
    };
    return this.invokeRPC<CommitRequest, api.CommitResponse>(
      'Commit',
      params
    ).then(response => {
      return this.serializer.fromWriteResults(response.writeResults);
    });
  }
 
  lookup(keys: DocumentKey[]): Promise<MaybeDocument[]> {
    const params: BatchGetDocumentsRequest = {
      database: this.serializer.encodedDatabaseId,
      documents: keys.map(k => this.serializer.toName(k))
    };
    return this.invokeStreamingRPC<
      BatchGetDocumentsRequest,
      api.BatchGetDocumentsResponse
    >('BatchGetDocuments', params).then(response => {
      let docs = maybeDocumentMap();
      response.forEach(proto => {
        const doc = this.serializer.fromMaybeDocument(proto);
        docs = docs.insert(doc.key, doc);
      });
      const result: MaybeDocument[] = [];
      keys.forEach(key => {
        const doc = docs.get(key);
        assert(!!doc, 'Missing entity in write response for ' + key);
        result.push(doc!);
      });
      return result;
    });
  }
 
  /** Gets an auth token and invokes the provided RPC. */
  private invokeRPC<Req, Resp>(rpcName: string, request: Req): Promise<Resp> {
    // TODO(mikelehen): Retry (with backoff) on token failures?
    return this.credentials.getToken(/*forceRefresh=*/ false).then(token => {
      return this.connection.invokeRPC<Req, Resp>(rpcName, request, token);
    });
  }
 
  /** Gets an auth token and invokes the provided RPC with streamed results. */
  private invokeStreamingRPC<Req, Resp>(
    rpcName: string,
    request: Req
  ): Promise<Resp[]> {
    // TODO(mikelehen): Retry (with backoff) on token failures?
    return this.credentials.getToken(/*forceRefresh=*/ false).then(token => {
      return this.connection.invokeStreamingRPC<Req, Resp>(
        rpcName,
        request,
        token
      );
    });
  }
}